/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lvyh.lightframe.transaction.core.service.impl;

import com.lvyh.lightframe.transaction.common.config.TransactionApplicationConfig;
import com.lvyh.lightframe.transaction.common.constant.TransactionConstant;
import com.lvyh.lightframe.transaction.common.context.TransactionContext;
import com.lvyh.lightframe.transaction.common.domain.Participant;
import com.lvyh.lightframe.transaction.common.domain.Transaction;
import com.lvyh.lightframe.transaction.common.enums.TransactionEventType;
import com.lvyh.lightframe.transaction.common.enums.TransactionRole;
import com.lvyh.lightframe.transaction.common.enums.TransactionStatus;
import com.lvyh.lightframe.transaction.common.thread.TransactionThreadLocal;
import com.lvyh.lightframe.transaction.core.context.ApplicationContextHolder;
import com.lvyh.lightframe.transaction.core.event.TransactionEventProducer;
import com.lvyh.lightframe.transaction.core.service.TransactionInvocationService;
import com.lvyh.lightframe.transaction.core.service.TransactionLogRepositoryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.lang.reflect.Method;
import java.util.Objects;

@Service
public class TransactionInvocationServiceImpl implements TransactionInvocationService {
    private static final Logger logger = LoggerFactory.getLogger(TransactionInvocationServiceImpl.class);

    @Autowired
    private TransactionLogRepositoryService repositoryService;
    @Autowired
    private TransactionEventProducer transactionEventProducer;

    private TransactionApplicationConfig transactionApplicationConfig;

    /**
     * There may be multiple messages in the message queue. This method will be called every time you hear a message
     */
    @Override
    public void invoke(Participant participant) {

        synchronized (TransactionInvocationServiceImpl.class) {
            //Since the transaction ID is the same for all transaction participants, the transaction object is obtained through the transaction ID first
            Transaction transaction = repositoryService.getById(participant.getTransactionId());

            logger.info("[TransactionInvocationServiceImpl] query transaction,transaction:{}, transactionId:{}", transaction, participant.getTransactionId());
            TransactionThreadLocal.TRANSACTION_THREADLOCAL.set(transaction);

            //If the transaction object does not exist, it means that the RPC request does not normally call the remote method,
            // the AOP does not take effect, and the transaction object is not created locally.
            //In this case, it is possible that the server was down when the RPC was called. After restarting the server, the message was monitored to the message queue,
            // and the method was re invoked through reflection to ensure data consistency.

            if (Objects.isNull(transaction)) {
                //Build a local transaction
                transaction = buildTransaction(participant);
                //If the transaction is null, you need to reset the transaction object for the current thread, otherwise you don't need to reset it
                TransactionThreadLocal.TRANSACTION_THREADLOCAL.set(transaction);
                try {
                    //Transaction participants begin to invoke methods through reflection
                    logger.info("[TransactionInvocationServiceImpl] transaction participants begin to invoke methods through reflection, transactionId:{}", participant.getTransactionId());
                    participantInvoke(participant);

                    //The reflection call succeeded, and the local transaction log status is set to commit
                    transaction.setStatus(TransactionStatus.COMMIT.getCode());
                    logger.info("[TransactionInvocationServiceImpl] reflection invoke transaction success, and local transaction log status is set to commit, transactionId:{}", participant.getTransactionId());

                } catch (Exception e) {
                    e.printStackTrace();
                    logger.error(e.getMessage());
                    //The reflection call failed, the local transaction log status is set to error, and the error message is set
                    transaction.setStatus(TransactionStatus.FAILURE.getCode());
                    transaction.setErrorMessage(e.getMessage());
                } finally {
                    //Regardless of success or failure, this new transaction log needs to be inserted into the database
                    transactionEventProducer.publish(transaction, TransactionEventType.INSERT.getCode());
                    logger.info("[TransactionInvocationServiceImpl] regardless of success or failure, this transaction log needs to be inserted into the database, transactionId:{}", participant.getTransactionId());
                }

            } else if (Objects.equals(transaction.getStatus(), TransactionStatus.FAILURE.getCode())) {

                /**
                 * Otherwise, the transaction object exists, indicating that the RPC request is normally called to the remote method.
                 * Before calling the method, the transaction object was created locally through AOP
                 * So, in order to ensure idempotency, it can't be called once before it is called, otherwise it is equivalent to a RPC request, calling two remote methods.
                 * However, you need to judge whether the transaction is wrong. If there is an error, you need to try the reflection calling method again
                 */

                //First, determine whether the number of retries of the error transaction log exceeds the configured number
                if (transaction.getRetriedCount() > TransactionConstant.RETRIED_COUNT) {
                    logger.error(participant.getDestination() + "的服务：" + participant.getTargetClass().getName() + "." + participant.getMethodName() + "对应的事务ID为：" + participant.getTransactionId() + "事务重试次数" + transaction.getRetriedCount() + "超过了设置的重试次数，需要人工处理");
                    return;
                }
                try {
                    transaction.setRetriedCount(transaction.getRetriedCount() + 1);
                    //Start retrying method calls through reflection
                    participantInvoke(participant);
                    //The reflection call succeeded, and the local transaction log status is set to commit
                    transaction.setStatus(TransactionStatus.COMMIT.getCode());
                    logger.info("[TransactionInvocationServiceImpl] transaction participants retry execute and local transaction status is successful, transactionId:{}", participant.getTransactionId());

                } catch (Exception e) {
                    logger.error(e.getMessage());
                } finally {
                    //This transaction log needs to be updated regardless of success or failure
                    transactionEventProducer.publish(transaction, TransactionEventType.UPDATE.getCode());
                }
            } else {
                logger.info("[TransactionInvocationServiceImpl] transaction participants status is success and needs to skip execution, transactionId:{}", participant.getTransactionId());
            }
        }
    }

    @Override
    public void setTransactionConfig(final TransactionApplicationConfig transactionApplicationConfig) {
        this.transactionApplicationConfig = transactionApplicationConfig;
    }

    private void participantInvoke(Participant participant) throws Exception {
        //Before calling the method through reflection, set the transaction context object for the current thread. The transaction ID is the ID of the participant. Ensure that all participants are the same transaction and the role is local "local"
        //The role local is used to enter AOP without initiating new transactions, otherwise there will be an additional transaction log.
        TransactionContext transactionContext = new TransactionContext();
        transactionContext.setTransactionId(participant.getTransactionId());
        transactionContext.setRole(TransactionRole.LOCAL.getCode());

        TransactionThreadLocal.CONTEXT_THREADLOCAL.set(transactionContext);
        Class clazz = participant.getTargetClass();
        Method method = clazz.getMethod(participant.getMethodName(), participant.getParameterTypes());
        Object bean = ApplicationContextHolder.getBean(clazz);

        method.invoke(bean, participant.getArguments());
    }

    /**
     * Build a transaction log
     */
    private Transaction buildTransaction(Participant participant) {
        Transaction transaction = new Transaction();
        transaction.setTargetMethod(participant.getMethodName());
        transaction.setTargetClass(participant.getTargetClass().getName());
        transaction.setRole(TransactionRole.ACTOR.getCode());
        transaction.setTransactionId(participant.getTransactionId());
        transaction.setStatus(TransactionStatus.BEGIN.getCode());
        return transaction;
    }
}
